Transformation Example

Print the max element of RDD.
val seq = Seq(3,9,2,3,5,4)
val rdd = sc.parallelize(seq,2)
rdd.takeOrdered(1)(Ordering[Int].reverse)

case class Person(name:String, age:Int)
val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19))
val rdd = sc.parallelize(people,2)
//rdd.takeOrdered(1)(Ordering[Int].reverse.on(x=>x.age))
rdd.takeOrdered(1)(Ordering[Int].reverse.on(_.age))

val rdd1 = sc.parallelize(List(("Hadoop PIG Hive"), ("Hive PIG PIG Hadoop"), ("Hadoop Hadoop Hadoop")))
val rdd2 = rdd1.flatMap(x => x.split(" ")).map(x => (x,1))
val rdd3 = rdd2.reduceByKey((x,y) => (x+y))

rdd3.takeOrdered(3)(Ordering[Int].reverse.on(x=>x._2))
rdd3.takeOrdered(3)(Ordering[Int].on(x=>x._2))

Find the max value with associated key?
val myRDD = sc.parallelize(Array(("a",1),("b",5),("c",1),("d",3))).sortBy(_._2,false).take(1)

How to calculate sum and count in a single groupBy?
val datasets = sc.parallelize(List(("A","HYD",10),("B","BLR",30),("A","HYD",40),("B","BLR",50),("C","DEL",60)))
val df1=datasets.toDF("id","Loc","Amt")
import org.apache.spark.sql.functions._
df1.groupBy($"id").agg(sum($"Amt"),max("Amt")).show()

Find the max value for each group
val rdd5=sc.parallelize(List(("v",3),("v", 1),("v", 1),("w", 7),("w", 1),("x", 3),("y", 1),("y", 1),("y", 2),("y", 3)))
rdd5.reduceByKey(math.max(_, _)).collect

val rdd5=sc.parallelize(List(("v",3),("v", 1),("v", 1),("w", 7),("w", 1),("x", 3),("y", 1),("y", 1),("y", 2),("y", 3)))
rdd5.groupByKey().map(x => (x._1,x._2.max)).collect

Find the sum of value of tuple.
val x = List(("X45", 2), ("W80", 1), ("F03", 2), ("X61", 2))

val rdd = sc.parallelize(x)
rdd.map(x => x._2).collect
rdd.map(x => x._2).sum
rdd.map(_._2).sum

val rdd2 = sc.parallelize(1 to 20)
rdd2.sum
rdd2.max
rdd2.min


val datasets = sc.parallelize(List(("A","HYD",10),("B","BLR",30),("A","HYD",40),("B","BLR",50),("C","DEL",60)))
val df1=datasets.toDF("id","Loc","Amt")
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
df1.groupBy("id","Loc").agg(sum("Amt")).show

Find avaerage
val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val agg_rdd = rdd.aggregateByKey((0,0))((x,y) => (x._1 + y, x._2 + 1),(x1, y1) => (x1._1 + y1._1, x1._2 + y1._2))
val sum = agg_rdd.mapValues(x => (x._1/x._2))
sum.collect

val data = Array(("9888wq",(1,2)),("abcd",(1,1)),("abcd",(3,2)),("9888wq",(4,2)))
val rdd= sc.parallelize(data)
val result = rdd.map(x => (x._1,(x._2._1+x._2._2))).reduceByKey((x,y) => x+y)
result.foreach(println)


Input
key1  value1
key1  value2
key2  value3
key3  value4
key3  value5

Output
key1  value1
key2  value3
key3  value4
 
I would like to get all the unique values among all the Array elements of this RDD I don't care about the key, just want to get all the unique values. So the result from the above sample is (1,2,3,4,5,20,30,50,400). 
 
100, Array(1,2,3,4,5)
200,Array(1,2,50,20)
300, Array(30,2,400,1)
 
val result = rdd.flatMap(_._2).distinct
 
val rdd = sc.parallelize(List(("something1@domainA.com"), 
                              ("something2@domainA.com"), 
                              ("something3@domainB.com")))
 
rdd.map(_.split("@")).flatMap { case Array(_, d) => d.split("\\.").headOption }.distinct.collect 
 
val df = Seq(
  (1,      20,  21),
  (2,      23,   22),
  (1,      26,  23),
  (2,      29,  24)
).toDF("Gender", "Age", "Value")

scala> df.show
+------+---+-----+
|Gender|Age|Value|
+------+---+-----+
|     1| 20|   21|
|     2| 23|   22|
|     1| 26|   23|
|     2| 29|   24|
+------+---+-----+

// Gender 1 = Male
// Gender 2 = Female

import org.apache.spark.sql.expressions.Window
val byGender = Window.partitionBy("gender").orderBy("gender")

val males = df
  .filter("gender = 1")
  .select($"age" as "male_age",
          $"value" as "male_value",
          row_number() over byGender as "RN")

scala> males.show
+--------+----------+---+
|male_age|male_value| RN|
+--------+----------+---+
|      20|        21|  1|
|      26|        23|  2|
+--------+----------+---+

val females = df
  .filter("gender = 2")
  .select($"age" as "female_age",
          $"value" as "female_value",
          row_number() over byGender as "RN")

scala> females.show
+----------+------------+---+
|female_age|female_value| RN|
+----------+------------+---+
|        23|          22|  1|
|        29|          24|  2|
+----------+------------+---+

scala> males.join(females, Seq("RN"), "outer").show
+---+--------+----------+----------+------------+
| RN|male_age|male_value|female_age|female_value|
+---+--------+----------+----------+------------+
|  1|      20|        21|        23|          22|
|  2|      26|        23|        29|          24| 

No comments:

Post a Comment